package gecko.aliiot;

import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.iot.model.v20180120.RRpcRequest;
import com.aliyuncs.iot.model.v20180120.RRpcResponse;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.profile.IClientProfile;
import com.aliyuncs.utils.Base64Helper;
import gecko.*;
import gecko.lang.JSONObject;
import gecko.lang.TypedMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.UnsupportedEncodingException;
import java.util.concurrent.CompletableFuture;

import static gecko.x.Strings.requireNotEmpty;

/**
 * 阿里云IOT平台设备通讯管道；
 * 为阿里云IOT虚拟设备对象，提供RPC通讯环境；
 *
 * @author 陈永佳 (yoojiachen@gmail.com)
 * @version 0.0.1
 */
public class AliIOTDevicePipeline extends DevicePipeline implements VirtualDevice.FutureTransporter {

    private static final Logger LOGGER = LoggerFactory.getLogger(AliIOTDevicePipeline.class);

    private DefaultAcsClient mAcsClient;
    private String mProductKey = "";

    @Override
    public CompletableFuture<EventFrame> transport(EventFrame event, GeckoScoped scoped, Address address) {
        // 使用阿里云的RPC功能向远程设备发起指令
        return scoped.submitFuture(() -> {
            final RRpcRequest request = new RRpcRequest();
            request.setProductKey(mProductKey);
            request.setRequestBase64Byte(Base64Helper.encode(event.bytes));
            request.setDeviceName(address.unionAddress());
            byte[] output;
            try {
                final RRpcResponse resp = mAcsClient.getAcsResponse(request);
                if (resp.getSuccess()) {
                    output = Base64Helper.decode(resp.getPayloadBase64Byte(), "utf-8").getBytes();
                } else {
                    LOGGER.error("阿里云设备RPC请求失败: {}", resp.getErrorMessage());
                    output = JSONObject.error(resp.getErrorMessage()).toJSONBytes();
                }
            } catch (ClientException ex) {
                LOGGER.error("阿里云设备RPC请求出错", ex);
                output = JSONObject.error(ex.getErrMsg()).toJSONBytes();
            } catch (UnsupportedEncodingException ex) {
                LOGGER.error("阿里云设备RPC响应无法编码UTF8", ex);
                output = JSONObject.error(ex.getMessage()).toJSONBytes();
            }
            return event.newOf(output);
        });
    }

    @Override
    public void onInit(TypedMap initArgs, GeckoScoped scoped) {
        final String accessKey = initArgs.getString("accessKey");
        final String accessSecret = initArgs.getString("accessSecret");
        final String endpoint = initArgs.getString("endpoint");
        final String regionId = initArgs.getString("regionId");
        final String domain = initArgs.getString("domain");
        final String productName = initArgs.getString("productName");
        mProductKey = requireNotEmpty(initArgs.getString("productKey"));
        // 初始化
        try {
            DefaultProfile.addEndpoint(
                    requireNotEmpty(endpoint),
                    requireNotEmpty(regionId),
                    productName,
                    requireNotEmpty(domain));
        } catch (ClientException ex) {
            LOGGER.error("阿里云物联网环境初始化错误", ex);
            throw new RuntimeException(ex);
        }
        IClientProfile profile = DefaultProfile.getProfile(regionId,
                requireNotEmpty(accessKey),
                requireNotEmpty(accessSecret));
        mAcsClient = new DefaultAcsClient(profile);
    }

    @Override
    public void onStart(GeckoScoped scoped) {
        // 绑定Mqtt消息发送接口
        forEach(dev -> ((AbstractDevice) dev).setFutureTransporter(this));
        super.onStart(scoped);
    }

    @Override
    public String getSupportProtocol() {
        return AliIOTVirtualDevice.PROTOCOL;
    }

}
